package org.hope.lee.producer.queue;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 *   
 *  @ProjectName: base-project 
 *  @Description: 顺序消费 消费者1
 *  @author: lisen
 *  @date: 2017/3/28  
 */
public class ConsumerQueue1 {

    public ConsumerQueue1() throws Exception {
        String group_name = "order_consumer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
        consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅的主题， 以及过滤的标签内容
        consumer.subscribe("TopicTest", "*");
        //注册监听
        consumer.registerMessageListener(new Listener());
        consumer.start();
        System.out.println("Consumer Started.....");
    }

    /**
     * 这里实现MessageListenerOrderLy接口就是为了达到顺序消费的目的,
     * 如果是使用MessageListenerConcurrently,则需要把线程池改为单线程模式。
     * 但是也不能保证说一定会顺序消费，因为如果master宕机了，导致写入队列的数量上
     * 出现变化。
     *
     * 从消费端，如果想保证这批消息是M1消费完成再消费M2的话，可以使用MessageListenerOrderly接口，但是这样的话会有以下问题：
     * 1. 遇到消息失败的消息，无法跳过，当前队列消费暂停
     * 2. 目前版本的RocketMQ的MessageListenerOrderly是不能从slave消费消息的。
     */
    class Listener implements MessageListenerOrderly {
        private Random random = new Random();

        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
            context.setAutoCommit(true);
            for(MessageExt msg : list) {
                System.out.println(msg + ", content:" + new String(msg.getBody()));
            }
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(5)); //随机休眠时间，模拟业务处理时间

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }
    public static void main(String[] args) throws Exception {
        ConsumerQueue1 c = new ConsumerQueue1();

    }
}
